Skip to content

Added NNG broker.#607

Open
s3rius wants to merge 1 commit intomasterfrom
feature/nng
Open

Added NNG broker.#607
s3rius wants to merge 1 commit intomasterfrom
feature/nng

Conversation

@s3rius
Copy link
Copy Markdown
Member

@s3rius s3rius commented Apr 11, 2026

Ref: #602.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 11, 2026

Codecov Report

❌ Patch coverage is 0% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 78.30%. Comparing base (02700d4) to head (2fd7a3c).

Files with missing lines Patch % Lines
taskiq/brokers/nng_broker.py 0.00% 22 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #607      +/-   ##
==========================================
- Coverage   79.00%   78.30%   -0.70%     
==========================================
  Files          69       70       +1     
  Lines        2463     2485      +22     
==========================================
  Hits         1946     1946              
- Misses        517      539      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


async def kick(self, message: BrokerMessage) -> None:
"""Send a message."""
await self.socket.ascend(message.message)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct pynng method for async sending is asend(). Currently, this will raise an AttributeError on the very first task dispatch.

Copy link
Copy Markdown

@alexted alexted left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new NNGBroker is not exported in taskiq/__init__.py and taskiq/brokers/__init__.py. We need to export it to keep the public API consistent with ZeroMQBroker and others, so users can just do from taskiq import NNGBroker.

Copy link
Copy Markdown

@alexted alexted left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a core networking component, we strictly need at least basic integration tests (e.g., in tests/brokers/) to verify startup, kick, listen, and shutdown behaviors.

Comment on lines +31 to +34
if self.is_worker_process:
self.socket.listen(self.addr)
else:
self.socket.dial(self.addr, block=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NNG (like most brokers) can be susceptible to slow joiner syndrome during dial and listen. If we kick a task immediately after dial, the message might be dropped or cause an error before the connection is fully established. To make the initial transmission more robust, should we consider adding a brief timeout or a socket state check?
Just thinking out loud - pure theory.

implementation.
"""

def __init__(self, addr: str) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addr parameter is accepted but remains unvalidated. Given that Taskiq relies heavily on Pydantic, it would be fitting to add basic validation for the connection string (e.g., checking for ipc:// or tcp:// prefixes).

@@ -0,0 +1,48 @@
from collections.abc import AsyncGenerator

import pynng
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency is added as an extra in pyproject.toml, but the code is missing a guard import.

try:
    import pynng
except ImportError:
    raise ImportError("Install 'taskiq[nng]' to use NNGBroker.")

Without it the user will get a confusing ModuleNotFoundError only when they try to initialize the class.

:param addr: address which is used by both worker and client.
"""
super().__init__()
self.socket = pynng.Pair1(polyamorous=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a fundamental architectural bottleneck that needs to be addressed to make this broker production-ready.

The current use of pynng.Pair1 with listen() inside the worker process forces the -w 1 limitation. In a distributed system, if multiple worker processes try to bind (listen) to the same port, it leads to address conflicts or, worse, duplicated task execution (where every worker picks up the same message). This is exactly why the existing ZeroMQ broker in taskiq is limited to a single worker.

The Solution: Router/Dealer + Proxy Device
To unlock horizontal scaling (-w 100) and bi-directional feedback (metrics, statuses), we should move to a centralized Proxy topology using Router/Dealer sockets.

  • The Proxy (Device): A lightweight mediator that listen()s on two ports (Frontend for clients, Backend for workers). It uses pynng.device() to pump messages between them with native NNG Round-Robin load balancing.

  • The Clients & Workers: Both become purely outbound - they only dial() the proxy. This eliminates port conflicts between worker processes.

Proposed Implementation
The broker should automatically attempt to start the proxy if it's running locally, or simply connect to it.

Broker Core Logic:

import pynng
import asyncio
import threading

class NNGBroker(AsyncBroker):
    async def startup(self) -> None:
        # 1. Try to start the internal proxy device 
        # (Only the first process succeeds, others will fail silently if port is bound)
        self._start_proxy_if_needed()

        if self.is_worker_process:
            # Workers connect to the backend
            self.socket = pynng.Dealer0(dial=self.worker_url)
        else:
            # Clients connect to the frontend
            self.socket = pynng.Dealer0(dial=self.client_url)

    def _start_proxy_if_needed(self):
        try:
            # Simple check/start logic for pynng.device
            # This allows horizontal scaling without manual proxy management
            def run_device():
                with pynng.Router0(listen=self.client_url) as front, \
                     pynng.Dealer0(listen=self.worker_url) as back:
                    pynng.device(front, back)
            
            threading.Thread(target=run_device, daemon=True).start()
        except pynng.AddressInUse:
            pass # Proxy is already running

Worker Feedback:

async def listen(self):
    async for message in self.socket:
        # Asynchronous feedback is now possible!
        await self.socket.asend(b"Status: Processing") 
        yield message

By adopting the Router/Dealer + Proxy pattern, we overcome the limitations of the ZeroMQ implementation and provide a truly scalable NNG transport for the ecosystem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants